package com.bw;

import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.common.base.BaseApp;
import com.bw.gmall.realtime.common.bean.TrafficHomeDetailPageViewBean;
import com.bw.gmall.realtime.common.constant.Constant;
import com.bw.gmall.realtime.common.funciton.DorisMapFunction;
import com.bw.gmall.realtime.common.util.DateFormatUtil;
import com.bw.gmall.realtime.common.util.FlinkSinkUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class DwsTrafficHomeDetailPageViewWindow extends BaseApp {
    public static void main(String[] args) {
        new DwsTrafficHomeDetailPageViewWindow().start( Constant.TOPIC_DWD_TRAFFIC_PAGE,Constant.DWS_TRAFFIC_HOME_DETAIL_PAGE_VIEW_WINDOW,1,10023);
    }
    @Override
    public void handle(StreamExecutionEnvironment env, DataStreamSource<String> dataStreamSource) {
        // 1.读取数据源
        // 2.做数据清洗
        SingleOutputStreamOperator<JSONObject> etlStream = dataStreamSource.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String s, Collector<JSONObject> collector) throws Exception {
                JSONObject jsonObject = JSONObject.parseObject(s);
                String mid = jsonObject.getJSONObject("common").getString("mid");
                String ts = jsonObject.getString("ts");
                String pageId = jsonObject.getJSONObject("page").getString("page_id");
                if ("home".equals(pageId) || "good_detail".equals(pageId)) {
                    if (mid != null && ts != null) {
                        collector.collect(jsonObject);
                    }
                }
            }
        });
        // 3. 根据mid分组
        SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> keyStream = etlStream.keyBy(new KeySelector<JSONObject, String>() {
            @Override
            public String getKey(JSONObject jsonObject) throws Exception {
                return jsonObject.getJSONObject("common").getString("mid");
            }
        }).process(new KeyedProcessFunction<String, JSONObject, TrafficHomeDetailPageViewBean>() {
            private ValueState<String> homeState;
            private ValueState<String> detailState;

            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<String> homeStateDesc = new ValueStateDescriptor<>("home_state", String.class);
                ValueStateDescriptor<String> detailStateDesc = new ValueStateDescriptor<>("detail_state", String.class);
                homeStateDesc.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1L)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
                detailStateDesc.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1L)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
                homeState = getRuntimeContext().getState(homeStateDesc);
                detailState = getRuntimeContext().getState(detailStateDesc);
            }

            @Override
            public void processElement(JSONObject jsonObject, KeyedProcessFunction<String, JSONObject, TrafficHomeDetailPageViewBean>.Context context, Collector<TrafficHomeDetailPageViewBean> collector) throws Exception {

                String homeValue = homeState.value();
                String detailValue = detailState.value();
                String curDt = DateFormatUtil.tsToDate(jsonObject.getLong("ts"));
                String pageId = jsonObject.getJSONObject("page").getString("page_id");
                long homeCt = 0L;
                long detailCt = 0L;
                if ("home".equals(pageId)) {
                    if (!curDt.equals(homeValue)) {
                        homeCt = 1L;
                        homeState.update(curDt);
                    }
                } else {
                    if (!curDt.equals(detailValue)) {
                        detailCt = 1L;
                        detailState.update(curDt);
                    }
                }
                if (homeCt !=0 || detailCt != 0){
                    collector.collect(TrafficHomeDetailPageViewBean.builder()
                            .homeUvCt(homeCt)
                            .goodDetailUvCt(detailCt)
                            .ts(jsonObject.getLong("ts"))
                            .build());
                }


                /**
                 * 2024-06-27:detail
                 *
                 * mid_1 home  home-1
                 * mid_1 home  home-0
                 * mid_1 detail detail-1
                 * mid_1 home home-1
                 */
//                String lastValue = lastState.value();
//                String curDt = DateFormatUtil.tsToDate(jsonObject.getLong("ts"));
//                String pageId =jsonObject.getJSONObject("common").getString("page_id");
//                long homeCt = 0L;
//                long detailCt = 0L;
//                if(!(curDt+":"+pageId).equals(lastValue)){
//                    if ("home".equals(pageId)){
//                        homeCt = 1L;
//                    }else{
//                        detailCt=1L;
//                    }
//                    lastState.update(curDt+":"+pageId);
//                }
            }
        });

        // 4. 添加水位线 开窗 聚合
        SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> reduceStream = keyStream.assignTimestampsAndWatermarks(WatermarkStrategy.<TrafficHomeDetailPageViewBean>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<TrafficHomeDetailPageViewBean>() {
                    @Override
                    public long extractTimestamp(TrafficHomeDetailPageViewBean trafficHomeDetailPageViewBean, long l) {
                        return trafficHomeDetailPageViewBean.getTs();
                    }
                })).windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<TrafficHomeDetailPageViewBean>() {
                    @Override
                    public TrafficHomeDetailPageViewBean reduce(TrafficHomeDetailPageViewBean value1, TrafficHomeDetailPageViewBean value2) throws Exception {
                        value1.setHomeUvCt(value1.getHomeUvCt() + value2.getHomeUvCt());
                        value1.setGoodDetailUvCt(value1.getGoodDetailUvCt() + value2.getGoodDetailUvCt());
                        return value1;
                    }
                }, new ProcessAllWindowFunction<TrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow>() {
                    @Override
                    public void process(ProcessAllWindowFunction<TrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow>.Context context, Iterable<TrafficHomeDetailPageViewBean> iterable, Collector<TrafficHomeDetailPageViewBean> collector) throws Exception {
                        TimeWindow window = context.window();
                        String start = DateFormatUtil.tsToDateTime(window.getStart());
                        String end = DateFormatUtil.tsToDateTime(window.getEnd());
                        TrafficHomeDetailPageViewBean trafficHomeDetailPageViewBean = iterable.iterator().next();
                        trafficHomeDetailPageViewBean.setStt(start);
                        trafficHomeDetailPageViewBean.setEdt(end);
                        trafficHomeDetailPageViewBean.setCurDate(DateFormatUtil.tsToDate(System.currentTimeMillis()));
                        collector.collect(trafficHomeDetailPageViewBean);

                    }
                });

        // 5. 写入doris
//        reduceStream.print();
        reduceStream.map(new DorisMapFunction<>()).sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_TRAFFIC_HOME_DETAIL_PAGE_VIEW_WINDOW));
    }
}
